package subpub

import (
	"fmt"
	"gitee.com/ymofen/gobase"
	"sync"
)

// 创建的Session对象必须要实现的接口方法
type PubSessionIntf interface {
	Start()
	Close() error

	// 发布数据
	Pub(topic string, max int, args ...interface{}) int
}

// 发布中心
// 负责创建管理会话对象
// 注册的方法必须是pub.开头
type PubHub struct {
	notify ISubPub
	lk     sync.RWMutex
	lstMap map[string]interface{}
}

func NewPubHub() *PubHub {
	rval := &PubHub{
		lstMap: make(map[string]interface{}),
		notify: NewSubchannel(),
	}
	return rval
}

var (
	DefaultPub = NewPubHub()
)

// 使用conntype参数, 调用pub.[xxx]注册的方法, 创建一个Pub会话对象
//
//	false: 如果sessionid已经存在; conntype参数未注册
func (this *PubHub) AddSession(sessionid string, conf gobase.StrMap, notifyFn SubFunc) (pub PubSessionIntf, err error) {
	this.lk.RLock()
	obj := this.lstMap[sessionid]
	this.lk.RUnlock()
	if obj != nil {
		return nil, fmt.Errorf("[%s]已经存在", sessionid)
	}

	typestr := conf.StringByName("conntype", "")
	if len(typestr) == 0 {
		return nil, fmt.Errorf("未指定conntype")
	} else {
		typestr = fmt.Sprintf("pub.%s", typestr)
	}
	obj, err = gobase.CreateFactoryInstance(typestr, sessionid, conf, this.notify)
	if err != nil {
		return nil, err
	}
	intf, ok := obj.(PubSessionIntf)
	if !ok {
		return nil, fmt.Errorf("[%s]类型插件不支持PubSessionIntf接口", typestr)
	}
	this.lk.Lock()
	this.lstMap[sessionid] = obj
	this.lk.Unlock()
	this.notify.Sub(sessionid, sessionid, notifyFn)
	intf.Start()
	return intf, nil
}

// 关闭移除一个会话
func (this *PubHub) DelSession(sessionid string) bool {
	this.lk.RLock()
	obj := this.lstMap[sessionid]
	this.lk.RUnlock()
	if obj != nil {
		if intf, ok := obj.(PubSessionIntf); ok {
			intf.Close()
		}
		this.lk.Lock()
		delete(this.lstMap, sessionid)
		this.lk.Unlock()
	}
	return this.notify.Unsub(sessionid, sessionid)
}

// 发布数据
func (this *PubHub) Pub(sessionid string, topic string, max int, args ...interface{}) int {
	this.lk.RLock()
	obj := this.lstMap[sessionid]
	this.lk.RUnlock()
	if obj != nil {
		if intf, ok := obj.(PubSessionIntf); ok {
			return intf.Pub(topic, max, args...)
		}
	}
	return 0
}
